import 'dart:async';

import 'package:rxdart/rxdart.dart';

import '../../common/test_page.dart';

Stream<int> getDelayedStream(int delay, int value) async* {
  final completer = Completer<void>();

  Timer(Duration(milliseconds: delay), () => completer.complete());

  await completer.future;

  yield value;
  yield value + 1;
  yield value + 2;
}

class RaceTestPage extends TestPage {
  RaceTestPage(super.title) {
    test('Rx.race', () async {
      final first = getDelayedStream(50, 1),
          second = getDelayedStream(60, 2),
          last = getDelayedStream(70, 3);
      var expected = 1;

      Rx.race([first, second, last]).listen(((result) {
        // test to see if the combined output matches
        expect(result.compareTo(expected++));
      }));
    });

    test('Rx.race.iterate.once', () async {
      var iterationCount = 0;

      final stream = Rx.race<int>(() sync* {
        ++iterationCount;
        yield Stream.value(1);
        yield Stream.value(2);
        yield Stream.value(3);
      }());

      expect(stream);
      expect(iterationCount);
    });

    test('Rx.race.single.subscription', () async {
      final first = getDelayedStream(50, 1);

      final stream = Rx.race([first]);

      stream.listen(null);
      expect(() => stream.listen(null));
    });

    test('Rx.race.asBroadcastStream', () async {
      final first = getDelayedStream(50, 1),
          second = getDelayedStream(60, 2),
          last = getDelayedStream(70, 3);

      final stream = Rx.race([first, second, last]).asBroadcastStream();

      // listen twice on same stream
      stream.listen(null);
      stream.listen(null);
      // code should reach here
      expect(stream.isBroadcast);
    });

    test('Rx.race.shouldThrowB', () async {
      final stream = Rx.race([Stream<void>.error(Exception('oh noes!'))]);

      // listen twice on same stream
      stream.listen(null,
          onError: ((Object e, StackTrace s) => expect(e)));
    });

    test('Rx.race.pause.resume', () async {
      final first = getDelayedStream(50, 1),
          second = getDelayedStream(60, 2),
          last = getDelayedStream(70, 3);

      late StreamSubscription<int> subscription;
      // ignore: deprecated_member_use
      subscription = Rx.race([first, second, last]).listen(((value) {
        expect(value);
        subscription.cancel();
      }));

      subscription.pause(Future<void>.delayed(const Duration(milliseconds: 80)));
    });

    test('Rx.race.empty', () {
      expect(Rx.race<int>(const []));
    });

    test('Rx.race.single', () {
      expect(
        Rx.race<int>([Stream.value(1)])
      );
    });

    test('Rx.race.cancel.throws', () async {
      Stream<int> stream() {
        final controller = StreamController<int>();
        controller.onCancel = () async {
          throw Exception('Exception when cancelling!');
        };

        return Rx.race<int>([
          controller.stream,
          Rx.concat([
            Rx.timer(1, const Duration(milliseconds: 100)),
            Rx.timer(2, const Duration(milliseconds: 100)),
          ]),
        ]);
      }

      expect(stream());

      expect(stream().take(1));
    });
  }

}